{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "name": "27 - Workflow scheduling",
      "provenance": [],
      "collapsed_sections": []
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "accelerator": "GPU"
  },
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "LjmhJ4ad9kBL"
      },
      "source": [
        "# Workflow scheduling\n",
        "\n",
        "Workflows are a simple yet powerful construct that takes a callable and returns elements. They are streaming and work on data in batches, allowing large volumes of data to be processed efficiently. When working with streaming data, workflows continually run until the data stream is exhausted. \n",
        "\n",
        "Workflows can also be scheduled to run. In this case, a static set of elements, dynamically expands. For example, an API service endpoint that returns items, or polling a directory with files coming in and out. \n",
        "\n",
        "This notebook will show how to use workflow scheduling in txtai."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8tLWvo9v-Q0u"
      },
      "source": [
        "# Install dependencies\n",
        "\n",
        "Install `txtai` and all dependencies."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "Fa5BCjMFqVKE"
      },
      "source": [
        "%%capture\n",
        "!pip install datasets git+https://github.com/neuml/txtai#egg=txtai[workflow]"
      ],
      "execution_count": 6,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Create workflow action\n",
        "\n",
        "Workflows run a series of tasks to transform and process data. This section creates a callable object that can be used as a workflow action. The object iterates over a dataset, returning a batch of data."
      ],
      "metadata": {
        "id": "EJ_hHmQtRgQM"
      }
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "3hYRk9JnsM0J"
      },
      "source": [
        "from datasets import load_dataset\n",
        "\n",
        "class Stream:\n",
        "  def __init__(self):\n",
        "    self.dataset = load_dataset(\"ag_news\", split=\"train\")\n",
        "    self.index, self.size = 0, 2500\n",
        "\n",
        "  def __call__(self, fields):\n",
        "    outputs = []\n",
        "    for field in fields:\n",
        "      output = []\n",
        "      for row in self.dataset.select(range(self.index, self.index+self.size)):\n",
        "        output.append((self.index, row[field], None))\n",
        "        self.index += 1\n",
        "\n",
        "      outputs.append(output)\n",
        "\n",
        "    return outputs"
      ],
      "execution_count": 7,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Build workflow\n",
        "\n",
        "Next we'll create the workflow. The workflow reads batches of data from a stream and loads it into an Embeddings index. We'll run this workflow four times on a scheduled interval to demonstrate a scheduled workflow."
      ],
      "metadata": {
        "id": "_B4YFu-1R2QC"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from txtai.app import Application\n",
        "\n",
        "# Run up to every 5 seconds 4 times\n",
        "workflow = \"\"\"\n",
        "writable: true\n",
        "embeddings:\n",
        "  path: sentence-transformers/nli-mpnet-base-v2\n",
        "  content: true\n",
        "\n",
        "workflow:\n",
        "  index:\n",
        "    schedule:\n",
        "      cron: '* * * * * 0/5'\n",
        "      elements:\n",
        "        - text\n",
        "      iterations: 4\n",
        "    tasks:\n",
        "      - __main__.Stream\n",
        "      - upsert\n",
        "\"\"\"\n",
        "\n",
        "app = Application(workflow)\n",
        "app.wait()"
      ],
      "metadata": {
        "id": "1oZag3tKWkfe",
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "outputId": "54d6ce91-d782-421a-e092-7e78bb6ee1d0"
      },
      "execution_count": 8,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "2022-02-03 02:12:06,720 [WARNING] _create_builder_config: Using custom data configuration default\n",
            "2022-02-03 02:12:06,727 [WARNING] download_and_prepare: Reusing dataset ag_news (/root/.cache/huggingface/datasets/ag_news/default/0.0.0/bc2bcb40336ace1a0374767fc29bb0296cdaf8a6da7298436239c54d79180548)\n",
            "2022-02-03 02:12:06,751 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5\n",
            "2022-02-03 02:12:06,757 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:12:10+00:00\n",
            "2022-02-03 02:12:34,937 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:12:35+00:00\n",
            "2022-02-03 02:12:59,967 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:13:00+00:00\n",
            "2022-02-03 02:13:23,349 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:13:25+00:00\n",
            "2022-02-03 02:13:49,621 [INFO] schedule: 'index' max iterations (4) reached\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "Reviewing the log above, we see the `index` job ran four times. Now let's query the index and see what was loaded."
      ],
      "metadata": {
        "id": "SC3Yf9EgSHiK"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Run an embeddings search\n",
        "\n",
        "Let's run a search against the newly created index."
      ],
      "metadata": {
        "id": "5hBY2TsZSQY0"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "import json\n",
        "\n",
        "# Show total number of records\n",
        "print(f\"Total records: {app.count()}\")\n",
        "\n",
        "# Run a search\n",
        "print(\"Search:\")\n",
        "print(json.dumps(app.search(\"life on mars\", limit=1), indent=2))"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "3slP9p_KkbtD",
        "outputId": "9473aaea-eaa3-42e5-f9db-1f9f6e51f5c5"
      },
      "execution_count": 9,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Total records: 10000\n",
            "Search:\n",
            "[\n",
            "  {\n",
            "    \"id\": \"119\",\n",
            "    \"text\": \"Life on Mars Likely, Scientist Claims (SPACE.com) SPACE.com - DENVER, COLORADO -- Those twin robots hard at work on Mars have transmitted teasing views that reinforce the prospect that microbial life may exist on the red planet.\",\n",
            "    \"score\": 0.7236138582229614\n",
            "  }\n",
            "]\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "The index has 10,000 records. We also see the top result for the query on `life on mars`."
      ],
      "metadata": {
        "id": "4i3H19ZmUSlK"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Run a scheduled embeddings search\n",
        "\n",
        "Now let's incrementally load the dataset with a scheduled workflow and run a scheduled search after each batch is loaded."
      ],
      "metadata": {
        "id": "-t-D7yu_WZl-"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from txtai.app import Application\n",
        "\n",
        "# Run every 5 seconds up to 4 times\n",
        "workflow = \"\"\"\n",
        "writable: true\n",
        "embeddings:\n",
        "  path: sentence-transformers/nli-mpnet-base-v2\n",
        "  content: true\n",
        "\n",
        "workflow:\n",
        "  index:\n",
        "    schedule:\n",
        "      cron: '* * * * * 0/5'\n",
        "      elements:\n",
        "        - text\n",
        "      iterations: 4\n",
        "    tasks:\n",
        "      - __main__.Stream\n",
        "      - upsert\n",
        "  search:\n",
        "    schedule:\n",
        "      cron: '* * * * * 0/5'\n",
        "      elements:\n",
        "        - life on mars\n",
        "      iterations: 4\n",
        "    tasks:\n",
        "      - action: search\n",
        "        args: [3]\n",
        "        task: console\n",
        "\"\"\"\n",
        "\n",
        "app = Application(workflow)\n",
        "app.wait()"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "xpuwL9aCUJOd",
        "outputId": "d994aaaf-7315-4109-d024-3e09773cc538"
      },
      "execution_count": 10,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "2022-02-03 02:13:55,789 [WARNING] _create_builder_config: Using custom data configuration default\n",
            "2022-02-03 02:13:55,797 [WARNING] download_and_prepare: Reusing dataset ag_news (/root/.cache/huggingface/datasets/ag_news/default/0.0.0/bc2bcb40336ace1a0374767fc29bb0296cdaf8a6da7298436239c54d79180548)\n",
            "2022-02-03 02:13:55,808 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5\n",
            "2022-02-03 02:13:55,808 [INFO] schedule: 'search' scheduler started with schedule * * * * * 0/5\n",
            "2022-02-03 02:13:55,810 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:14:00+00:00\n",
            "2022-02-03 02:13:55,814 [INFO] schedule: 'search' next run scheduled for 2022-02-03T02:14:00+00:00\n",
            "2022-02-03 02:14:00,001 [INFO] schedule: 'search' next run scheduled for 2022-02-03T02:14:05+00:00\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Inputs: [\n",
            "  \"life on mars\"\n",
            "]\n",
            "Outputs: [\n",
            "  null\n",
            "]\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "2022-02-03 02:14:24,500 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:14:25+00:00\n",
            "2022-02-03 02:14:24,522 [INFO] schedule: 'search' next run scheduled for 2022-02-03T02:14:25+00:00\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Inputs: [\n",
            "  \"life on mars\"\n",
            "]\n",
            "Outputs: [\n",
            "  {\n",
            "    \"id\": \"119\",\n",
            "    \"text\": \"Life on Mars Likely, Scientist Claims (SPACE.com) SPACE.com - DENVER, COLORADO -- Those twin robots hard at work on Mars have transmitted teasing views that reinforce the prospect that microbial life may exist on the red planet.\",\n",
            "    \"score\": 0.7236138582229614\n",
            "  },\n",
            "  {\n",
            "    \"id\": \"271\",\n",
            "    \"text\": \"Saturn's Moon Titan: Prebiotic Laboratory by Harry Bortman    In this second and final part of the interview, Lunine explains how Huygens may help scientists understand the origin of life on Earth, even if it doesn't detect life on Titan.    Astrobiology Magazine -- Titan is the only moon in our solar system with an atmosphere, and it is the organic chemistry that has been detected in that atmosphere that has sparked the imagination of planetary scientists like Lunine...\",\n",
            "    \"score\": 0.4750666916370392\n",
            "  },\n",
            "  {\n",
            "    \"id\": \"1132\",\n",
            "    \"text\": \"Is Mercury the Incredible Shrinking Planet? MESSENGER Spacecraft May Find Out (SPACE.com) SPACE.com - With a new spacecraft bound for Mercury, that tiny planet nbsp;near the heart of the solar system, researchers are hoping to solve a slew of riddles about the small world.\",\n",
            "    \"score\": 0.47124743461608887\n",
            "  }\n",
            "]\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "2022-02-03 02:14:25,496 [INFO] schedule: 'search' next run scheduled for 2022-02-03T02:14:30+00:00\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Inputs: [\n",
            "  \"life on mars\"\n",
            "]\n",
            "Outputs: [\n",
            "  {\n",
            "    \"id\": \"119\",\n",
            "    \"text\": \"Life on Mars Likely, Scientist Claims (SPACE.com) SPACE.com - DENVER, COLORADO -- Those twin robots hard at work on Mars have transmitted teasing views that reinforce the prospect that microbial life may exist on the red planet.\",\n",
            "    \"score\": 0.7236138582229614\n",
            "  },\n",
            "  {\n",
            "    \"id\": \"271\",\n",
            "    \"text\": \"Saturn's Moon Titan: Prebiotic Laboratory by Harry Bortman    In this second and final part of the interview, Lunine explains how Huygens may help scientists understand the origin of life on Earth, even if it doesn't detect life on Titan.    Astrobiology Magazine -- Titan is the only moon in our solar system with an atmosphere, and it is the organic chemistry that has been detected in that atmosphere that has sparked the imagination of planetary scientists like Lunine...\",\n",
            "    \"score\": 0.4750666916370392\n",
            "  },\n",
            "  {\n",
            "    \"id\": \"1132\",\n",
            "    \"text\": \"Is Mercury the Incredible Shrinking Planet? MESSENGER Spacecraft May Find Out (SPACE.com) SPACE.com - With a new spacecraft bound for Mercury, that tiny planet nbsp;near the heart of the solar system, researchers are hoping to solve a slew of riddles about the small world.\",\n",
            "    \"score\": 0.47124743461608887\n",
            "  }\n",
            "]\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "2022-02-03 02:14:50,112 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:14:55+00:00\n",
            "2022-02-03 02:14:50,138 [INFO] schedule: 'search' max iterations (4) reached\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Inputs: [\n",
            "  \"life on mars\"\n",
            "]\n",
            "Outputs: [\n",
            "  {\n",
            "    \"id\": \"119\",\n",
            "    \"text\": \"Life on Mars Likely, Scientist Claims (SPACE.com) SPACE.com - DENVER, COLORADO -- Those twin robots hard at work on Mars have transmitted teasing views that reinforce the prospect that microbial life may exist on the red planet.\",\n",
            "    \"score\": 0.7236138582229614\n",
            "  },\n",
            "  {\n",
            "    \"id\": \"3300\",\n",
            "    \"text\": \"Mars Hills, Crater Yield Evidence of Flowing Water LOS ANGELES (Reuters) - The hills of Mars yielded more tantalizing clues about how water shaped the Red Planet in tests by NASA #39;s robotic geologist, Spirit, while its twin, Opportunity, observed the deep crater it climbed into two months ...\",\n",
            "    \"score\": 0.6666488647460938\n",
            "  },\n",
            "  {\n",
            "    \"id\": \"4201\",\n",
            "    \"text\": \"Martian hill shows signs of ancient water LOS ANGELES - NASA #39;s Spirit rover has found more evidence of past water on the hills of Mars, while its twin, Opportunity, has observed a field of dunes inside a crater. \",\n",
            "    \"score\": 0.6453495621681213\n",
            "  }\n",
            "]\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "2022-02-03 02:15:18,333 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:15:20+00:00\n",
            "2022-02-03 02:15:44,592 [INFO] schedule: 'index' max iterations (4) reached\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "The workflow above runs up to every 5 seconds. Note that since the index job takes longer than 5 seconds, the time difference between jobs is longer.\n",
        "\n",
        "The index job loads the next batch of data and the search job runs a recurring search. \n",
        "\n",
        "See how the search results change over time as more relevant results are found."
      ],
      "metadata": {
        "id": "o9EX2NgxV23x"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Wrapping up\n",
        "\n",
        "This notebook covered how to use workflow scheduling with txtai. While there are existing ways to schedule jobs (system cron, serverless, and so on), this is another easy and quick way to do it. "
      ],
      "metadata": {
        "id": "Fr99QHPtTMJt"
      }
    }
  ]
}
